-
Notifications
You must be signed in to change notification settings - Fork 14.8k
KAFKA-19817 Move DynamicTopicClusterQuotaPublisher to metadata module #20783
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
fd909db to
bdfe7b3
Compare
@JimmyWang6 please fix the build error |
|
@chia7712 Thanks for the comment, just fixed the build error, please take another look when you're free :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JimmyWang6 thanks for this patch!
...ata/src/main/java/org/apache/kafka/metadata/publisher/DynamicTopicClusterQuotaPublisher.java
Outdated
Show resolved
Hide resolved
...ata/src/main/java/org/apache/kafka/metadata/publisher/DynamicTopicClusterQuotaPublisher.java
Outdated
Show resolved
Hide resolved
server-common/src/main/java/org/apache/kafka/server/network/Session.java
Show resolved
Hide resolved
|
@chia7712 Thanks for your comments. All of them have been addressed, and please take another look. |
| quotaManagersProvider.clientQuotaCallbackPlugin().ifPresent(plugin -> { | ||
| if (delta.topicsDelta() != null || delta.clusterDelta() != null) { | ||
| Cluster cluster = MetadataCache.toCluster(clusterId, newImage); | ||
| if (plugin.get().updateClusterMetadata(cluster)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The necessity of this method is being discussed in KIP-1200. While I hope the vote succeeds, there is no harm in migrating the code though
| int nodeId, | ||
| FaultHandler faultHandler, | ||
| String nodeType, | ||
| QuotaManagersProvider quotaManagers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you consider de-coupling those components by using lambda function? for example:
public DynamicTopicClusterQuotaPublisher(
String clusterId,
int nodeId,
FaultHandler faultHandler,
String nodeType,
Plugin<ClientQuotaCallback> clientQuotaCallbackPlugin,
Runnable updateQuotaMetricConfigs
) {
this.clusterId = clusterId;
this.nodeId = nodeId;
this.faultHandler = faultHandler;
this.nodeType = nodeType;
this.clientQuotaCallbackPlugin = clientQuotaCallbackPlugin;
this.updateQuotaMetricConfigs = updateQuotaMetricConfigs;
}
@Override
public String name() {
return "DynamicTopicClusterQuotaPublisher " + nodeType + " id=" + nodeId;
}
@Override
public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage, LoaderManifest manifest) {
try {
if (delta.topicsDelta() != null || delta.clusterDelta() != null) {
Cluster cluster = MetadataCache.toCluster(clusterId, newImage);
if (clientQuotaCallbackPlugin.get().updateClusterMetadata(cluster)) {
updateQuotaMetricConfigs.run();
}
}
} catch (Exception e) {
String deltaName = "MetadataDelta up to " + newImage.highestOffsetAndEpoch().offset();
faultHandler.handleFault("Uncaught exception while publishing dynamic topic or cluster changes from " + deltaName, e);
}
}- we could create
DynamicTopicClusterQuotaPublisheronly if theclientQuotaCallbackPluginis existent - we wrap all callbacks in a single runnable object to avoid move many classes into
server-commonmodule
@JimmyWang6 WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @chia7712 ,
I just tried and found that clientQuotaCallbackPlugin also belongs to class ClientQuotaManager, so if we are going to de-coupe those components, the code will be shown like this:
public class DynamicTopicClusterQuotaPublisher implements MetadataPublisher {
private final FaultHandler faultHandler;
private final String nodeType;
private final Runnable updateQuotaMetricConfigs;
public DynamicTopicClusterQuotaPublisher(
FaultHandler faultHandler,
String nodeType,
Runnable updateQuotaMetricConfigs
) {
this.faultHandler = faultHandler;
this.nodeType = nodeType;
this.updateQuotaMetricConfigs = updateQuotaMetricConfigs;
}
@Override
public String name() {
return "DynamicTopicClusterQuotaPublisher " + nodeType;
}
@Override
public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage, LoaderManifest manifest) {
try {
if (delta.topicsDelta() != null || delta.clusterDelta() != null) {
updateQuotaMetricConfigs.run();
}
} catch (Exception e) {
String deltaName = "MetadataDelta up to " + newImage.highestOffsetAndEpoch().offset();
faultHandler.handleFault("Uncaught exception while publishing dynamic topic or cluster changes from " + deltaName, e);
}
}
}And the logic of the original DynamicTopicClusterQuotaPublisher will still remain in the new class updateQuotaMetricConfigs somewhere within the core module. However, such a modification might deviate from the original intent of this issue. What's your perspective on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes but DynamicTopicClusterQuotaPublisher can generate the Cluster and check the result of updateClusterMetadata
see: KAFKA-19817
This PR intended to move
DynamicTopicClusterQuotaPublisherto the metadata module .To prevent circular dependencies, several associated classes have been migrated as well,including
ClientSensors,ClientQuotaMetadataManager,ThrottleCallbackandThrottledChannel.Additionally, a new interface
QuotaManagersProviderhas been introduced to avoid the need to move classQuotaManagersinQuotaFactory, as doing so would have necessitated significantly more extensive changes.